package com.atguigu.chapter05;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

public class RescaleTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);


        env.addSource(new RichParallelSourceFunction<Integer>() {
            private boolean running = true;

            @Override
            public void run(SourceContext<Integer> sourceContext) throws Exception {
                for(int i = 0;i < 8 ;i++){
                    if ((i+1)%2==getRuntimeContext().getIndexOfThisSubtask()){
                        sourceContext.collect(i+1);
                    }
                }
            }

            @Override
            public void cancel() {
                running=false;
            }
        }).setParallelism(2).rescale().print().setParallelism(4);


        env.execute();


    }
}
